1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.junit.Assert.assertNotNull;
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  import static org.mockito.Matchers.any;
23  import static org.mockito.Matchers.anyInt;
24  import static org.mockito.Matchers.anyString;
25  import static org.mockito.Matchers.isA;
26  import static org.mockito.Mockito.inOrder;
27  import static org.mockito.Mockito.mock;
28  import static org.mockito.Mockito.never;
29  import static org.mockito.Mockito.times;
30  import static org.mockito.Mockito.verify;
31  
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.LinkedList;
35  import java.util.List;
36  import java.util.NoSuchElementException;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import org.junit.Before;
43  import org.junit.Test;
44  import org.mockito.InOrder;
45  import org.mockito.Mock;
46  import org.mockito.MockitoAnnotations;
47  
48  import rx.Observable.OnSubscribe;
49  import rx.Observable.Transformer;
50  import rx.exceptions.OnErrorNotImplementedException;
51  import rx.functions.Action1;
52  import rx.functions.Action2;
53  import rx.functions.Func0;
54  import rx.functions.Func1;
55  import rx.functions.Func2;
56  import rx.observables.ConnectableObservable;
57  import rx.observers.TestSubscriber;
58  import rx.schedulers.TestScheduler;
59  import rx.subjects.ReplaySubject;
60  import rx.subjects.Subject;
61  import rx.subscriptions.BooleanSubscription;
62  
63  public class ObservableTests {
64  
65      @Mock
66      Observer<Integer> w;
67  
68      private static final Func1<Integer, Boolean> IS_EVEN = new Func1<Integer, Boolean>() {
69          @Override
70          public Boolean call(Integer value) {
71              return value % 2 == 0;
72          }
73      };
74  
75      @Before
76      public void before() {
77          MockitoAnnotations.initMocks(this);
78      }
79  
80      @Test
81      public void fromArray() {
82          String[] items = new String[] { "one", "two", "three" };
83          assertEquals(new Integer(3), Observable.from(items).count().toBlocking().single());
84          assertEquals("two", Observable.from(items).skip(1).take(1).toBlocking().single());
85          assertEquals("three", Observable.from(items).takeLast(1).toBlocking().single());
86      }
87  
88      @Test
89      public void fromIterable() {
90          ArrayList<String> items = new ArrayList<String>();
91          items.add("one");
92          items.add("two");
93          items.add("three");
94  
95          assertEquals(new Integer(3), Observable.from(items).count().toBlocking().single());
96          assertEquals("two", Observable.from(items).skip(1).take(1).toBlocking().single());
97          assertEquals("three", Observable.from(items).takeLast(1).toBlocking().single());
98      }
99  
100     @Test
101     public void fromArityArgs3() {
102         Observable<String> items = Observable.just("one", "two", "three");
103 
104         assertEquals(new Integer(3), items.count().toBlocking().single());
105         assertEquals("two", items.skip(1).take(1).toBlocking().single());
106         assertEquals("three", items.takeLast(1).toBlocking().single());
107     }
108 
109     @Test
110     public void fromArityArgs1() {
111         Observable<String> items = Observable.just("one");
112 
113         assertEquals(new Integer(1), items.count().toBlocking().single());
114         assertEquals("one", items.takeLast(1).toBlocking().single());
115     }
116 
117     @Test
118     public void testCreate() {
119 
120         Observable<String> observable = Observable.create(new OnSubscribe<String>() {
121 
122             @Override
123             public void call(Subscriber<? super String> Observer) {
124                 Observer.onNext("one");
125                 Observer.onNext("two");
126                 Observer.onNext("three");
127                 Observer.onCompleted();
128             }
129 
130         });
131 
132         @SuppressWarnings("unchecked")
133         Observer<String> observer = mock(Observer.class);
134         observable.subscribe(observer);
135         verify(observer, times(1)).onNext("one");
136         verify(observer, times(1)).onNext("two");
137         verify(observer, times(1)).onNext("three");
138         verify(observer, never()).onError(any(Throwable.class));
139         verify(observer, times(1)).onCompleted();
140     }
141 
142     @Test
143     public void testCountAFewItems() {
144         Observable<String> observable = Observable.just("a", "b", "c", "d");
145         observable.count().subscribe(w);
146         // we should be called only once
147         verify(w, times(1)).onNext(anyInt());
148         verify(w).onNext(4);
149         verify(w, never()).onError(any(Throwable.class));
150         verify(w, times(1)).onCompleted();
151     }
152 
153     @Test
154     public void testCountZeroItems() {
155         Observable<String> observable = Observable.empty();
156         observable.count().subscribe(w);
157         // we should be called only once
158         verify(w, times(1)).onNext(anyInt());
159         verify(w).onNext(0);
160         verify(w, never()).onError(any(Throwable.class));
161         verify(w, times(1)).onCompleted();
162     }
163 
164     @Test
165     public void testCountError() {
166         Observable<String> o = Observable.create(new OnSubscribe<String>() {
167             @Override
168             public void call(Subscriber<? super String> obsv) {
169                 obsv.onError(new RuntimeException());
170             }
171         });
172         o.count().subscribe(w);
173         verify(w, never()).onNext(anyInt());
174         verify(w, never()).onCompleted();
175         verify(w, times(1)).onError(any(RuntimeException.class));
176     }
177 
178     public void testTakeFirstWithPredicateOfSome() {
179         Observable<Integer> observable = Observable.just(1, 3, 5, 4, 6, 3);
180         observable.takeFirst(IS_EVEN).subscribe(w);
181         verify(w, times(1)).onNext(anyInt());
182         verify(w).onNext(4);
183         verify(w, times(1)).onCompleted();
184         verify(w, never()).onError(any(Throwable.class));
185     }
186 
187     @Test
188     public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
189         Observable<Integer> observable = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
190         observable.takeFirst(IS_EVEN).subscribe(w);
191         verify(w, never()).onNext(anyInt());
192         verify(w, times(1)).onCompleted();
193         verify(w, never()).onError(any(Throwable.class));
194     }
195 
196     @Test
197     public void testTakeFirstOfSome() {
198         Observable<Integer> observable = Observable.just(1, 2, 3);
199         observable.take(1).subscribe(w);
200         verify(w, times(1)).onNext(anyInt());
201         verify(w).onNext(1);
202         verify(w, times(1)).onCompleted();
203         verify(w, never()).onError(any(Throwable.class));
204     }
205 
206     @Test
207     public void testTakeFirstOfNone() {
208         Observable<Integer> observable = Observable.empty();
209         observable.take(1).subscribe(w);
210         verify(w, never()).onNext(anyInt());
211         verify(w, times(1)).onCompleted();
212         verify(w, never()).onError(any(Throwable.class));
213     }
214 
215     @Test
216     public void testFirstOfNone() {
217         Observable<Integer> observable = Observable.empty();
218         observable.first().subscribe(w);
219         verify(w, never()).onNext(anyInt());
220         verify(w, never()).onCompleted();
221         verify(w, times(1)).onError(isA(NoSuchElementException.class));
222     }
223 
224     @Test
225     public void testFirstWithPredicateOfNoneMatchingThePredicate() {
226         Observable<Integer> observable = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
227         observable.first(IS_EVEN).subscribe(w);
228         verify(w, never()).onNext(anyInt());
229         verify(w, never()).onCompleted();
230         verify(w, times(1)).onError(isA(NoSuchElementException.class));
231     }
232 
233     @Test
234     public void testReduce() {
235         Observable<Integer> observable = Observable.just(1, 2, 3, 4);
236         observable.reduce(new Func2<Integer, Integer, Integer>() {
237 
238             @Override
239             public Integer call(Integer t1, Integer t2) {
240                 return t1 + t2;
241             }
242 
243         }).subscribe(w);
244         // we should be called only once
245         verify(w, times(1)).onNext(anyInt());
246         verify(w).onNext(10);
247     }
248 
249     /**
250      * A reduce should fail with an NoSuchElementException if done on an empty Observable.
251      */
252     @Test(expected = NoSuchElementException.class)
253     public void testReduceWithEmptyObservable() {
254         Observable<Integer> observable = Observable.range(1, 0);
255         observable.reduce(new Func2<Integer, Integer, Integer>() {
256 
257             @Override
258             public Integer call(Integer t1, Integer t2) {
259                 return t1 + t2;
260             }
261 
262         }).toBlocking().forEach(new Action1<Integer>() {
263 
264             @Override
265             public void call(Integer t1) {
266                 // do nothing ... we expect an exception instead
267             }
268         });
269 
270         fail("Expected an exception to be thrown");
271     }
272 
273     /**
274      * A reduce on an empty Observable and a seed should just pass the seed through.
275      * 
276      * This is confirmed at https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642456
277      */
278     @Test
279     public void testReduceWithEmptyObservableAndSeed() {
280         Observable<Integer> observable = Observable.range(1, 0);
281         int value = observable.reduce(1, new Func2<Integer, Integer, Integer>() {
282 
283             @Override
284             public Integer call(Integer t1, Integer t2) {
285                 return t1 + t2;
286             }
287 
288         }).toBlocking().last();
289 
290         assertEquals(1, value);
291     }
292 
293     @Test
294     public void testReduceWithInitialValue() {
295         Observable<Integer> observable = Observable.just(1, 2, 3, 4);
296         observable.reduce(50, new Func2<Integer, Integer, Integer>() {
297 
298             @Override
299             public Integer call(Integer t1, Integer t2) {
300                 return t1 + t2;
301             }
302 
303         }).subscribe(w);
304         // we should be called only once
305         verify(w, times(1)).onNext(anyInt());
306         verify(w).onNext(60);
307     }
308 
309     @Test
310     public void testOnSubscribeFails() {
311         @SuppressWarnings("unchecked")
312         Observer<String> observer = mock(Observer.class);
313         final RuntimeException re = new RuntimeException("bad impl");
314         Observable<String> o = Observable.create(new OnSubscribe<String>() {
315 
316             @Override
317             public void call(Subscriber<? super String> t1) {
318                 throw re;
319             }
320 
321         });
322         o.subscribe(observer);
323         verify(observer, times(0)).onNext(anyString());
324         verify(observer, times(0)).onCompleted();
325         verify(observer, times(1)).onError(re);
326     }
327 
328     @Test
329     public void testMaterializeDematerializeChaining() {
330         Observable<Integer> obs = Observable.just(1);
331         Observable<Integer> chained = obs.materialize().dematerialize();
332 
333         @SuppressWarnings("unchecked")
334         Observer<Integer> observer = mock(Observer.class);
335         chained.subscribe(observer);
336 
337         verify(observer, times(1)).onNext(1);
338         verify(observer, times(1)).onCompleted();
339         verify(observer, times(0)).onError(any(Throwable.class));
340     }
341 
342     /**
343      * The error from the user provided Observer is not handled by the subscribe method try/catch.
344      * 
345      * It is handled by the AtomicObserver that wraps the provided Observer.
346      * 
347      * Result: Passes (if AtomicObserver functionality exists)
348      */
349     @Test
350     public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
351         final CountDownLatch latch = new CountDownLatch(1);
352         final AtomicInteger count = new AtomicInteger();
353         final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
354         Observable.create(new OnSubscribe<String>() {
355 
356             @Override
357             public void call(final Subscriber<? super String> observer) {
358                 final BooleanSubscription s = new BooleanSubscription();
359                 new Thread(new Runnable() {
360 
361                     @Override
362                     public void run() {
363                         try {
364                             if (!s.isUnsubscribed()) {
365                                 observer.onNext("1");
366                                 observer.onNext("2");
367                                 observer.onNext("three");
368                                 observer.onNext("4");
369                                 observer.onCompleted();
370                             }
371                         } finally {
372                             latch.countDown();
373                         }
374                     }
375                 }).start();
376             }
377         }).subscribe(new Subscriber<String>() {
378             @Override
379             public void onCompleted() {
380                 System.out.println("completed");
381             }
382 
383             @Override
384             public void onError(Throwable e) {
385                 error.set(e);
386                 System.out.println("error");
387                 e.printStackTrace();
388             }
389 
390             @Override
391             public void onNext(String v) {
392                 int num = Integer.parseInt(v);
393                 System.out.println(num);
394                 // doSomething(num);
395                 count.incrementAndGet();
396             }
397 
398         });
399 
400         // wait for async sequence to complete
401         latch.await();
402 
403         assertEquals(2, count.get());
404         assertNotNull(error.get());
405         if (!(error.get() instanceof NumberFormatException)) {
406             fail("It should be a NumberFormatException");
407         }
408     }
409 
410     /**
411      * The error from the user provided Observer is handled by the subscribe try/catch because this is synchronous
412      * 
413      * Result: Passes
414      */
415     @Test
416     public void testCustomObservableWithErrorInObserverSynchronous() {
417         final AtomicInteger count = new AtomicInteger();
418         final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
419         Observable.create(new OnSubscribe<String>() {
420 
421             @Override
422             public void call(Subscriber<? super String> observer) {
423                 observer.onNext("1");
424                 observer.onNext("2");
425                 observer.onNext("three");
426                 observer.onNext("4");
427                 observer.onCompleted();
428             }
429         }).subscribe(new Subscriber<String>() {
430 
431             @Override
432             public void onCompleted() {
433                 System.out.println("completed");
434             }
435 
436             @Override
437             public void onError(Throwable e) {
438                 error.set(e);
439                 System.out.println("error");
440                 e.printStackTrace();
441             }
442 
443             @Override
444             public void onNext(String v) {
445                 int num = Integer.parseInt(v);
446                 System.out.println(num);
447                 // doSomething(num);
448                 count.incrementAndGet();
449             }
450 
451         });
452         assertEquals(2, count.get());
453         assertNotNull(error.get());
454         if (!(error.get() instanceof NumberFormatException)) {
455             fail("It should be a NumberFormatException");
456         }
457     }
458 
459     /**
460      * The error from the user provided Observable is handled by the subscribe try/catch because this is synchronous
461      * 
462      * 
463      * Result: Passes
464      */
465     @Test
466     public void testCustomObservableWithErrorInObservableSynchronous() {
467         final AtomicInteger count = new AtomicInteger();
468         final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
469         Observable.create(new OnSubscribe<String>() {
470 
471             @Override
472             public void call(Subscriber<? super String> observer) {
473                 observer.onNext("1");
474                 observer.onNext("2");
475                 throw new NumberFormatException();
476             }
477         }).subscribe(new Subscriber<String>() {
478 
479             @Override
480             public void onCompleted() {
481                 System.out.println("completed");
482             }
483 
484             @Override
485             public void onError(Throwable e) {
486                 error.set(e);
487                 System.out.println("error");
488                 e.printStackTrace();
489             }
490 
491             @Override
492             public void onNext(String v) {
493                 System.out.println(v);
494                 count.incrementAndGet();
495             }
496 
497         });
498         assertEquals(2, count.get());
499         assertNotNull(error.get());
500         if (!(error.get() instanceof NumberFormatException)) {
501             fail("It should be a NumberFormatException");
502         }
503     }
504 
505     @Test
506     public void testPublishLast() throws InterruptedException {
507         final AtomicInteger count = new AtomicInteger();
508         ConnectableObservable<String> connectable = Observable.create(new OnSubscribe<String>() {
509             @Override
510             public void call(final Subscriber<? super String> observer) {
511                 count.incrementAndGet();
512                 new Thread(new Runnable() {
513                     @Override
514                     public void run() {
515                         observer.onNext("first");
516                         observer.onNext("last");
517                         observer.onCompleted();
518                     }
519                 }).start();
520             }
521         }).takeLast(1).publish();
522 
523         // subscribe once
524         final CountDownLatch latch = new CountDownLatch(1);
525         connectable.subscribe(new Action1<String>() {
526             @Override
527             public void call(String value) {
528                 assertEquals("last", value);
529                 latch.countDown();
530             }
531         });
532 
533         // subscribe twice
534         connectable.subscribe(new Action1<String>() {
535             @Override
536             public void call(String ignored) {
537             }
538         });
539 
540         Subscription subscription = connectable.connect();
541         assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
542         assertEquals(1, count.get());
543         subscription.unsubscribe();
544     }
545 
546     @Test
547     public void testReplay() throws InterruptedException {
548         final AtomicInteger counter = new AtomicInteger();
549         ConnectableObservable<String> o = Observable.create(new OnSubscribe<String>() {
550 
551             @Override
552             public void call(final Subscriber<? super String> observer) {
553                 new Thread(new Runnable() {
554 
555                     @Override
556                     public void run() {
557                         counter.incrementAndGet();
558                         observer.onNext("one");
559                         observer.onCompleted();
560                     }
561                 }).start();
562             }
563         }).replay();
564 
565         // we connect immediately and it will emit the value
566         Subscription s = o.connect();
567         try {
568 
569             // we then expect the following 2 subscriptions to get that same value
570             final CountDownLatch latch = new CountDownLatch(2);
571 
572             // subscribe once
573             o.subscribe(new Action1<String>() {
574 
575                 @Override
576                 public void call(String v) {
577                     assertEquals("one", v);
578                     latch.countDown();
579                 }
580             });
581 
582             // subscribe again
583             o.subscribe(new Action1<String>() {
584 
585                 @Override
586                 public void call(String v) {
587                     assertEquals("one", v);
588                     latch.countDown();
589                 }
590             });
591 
592             if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
593                 fail("subscriptions did not receive values");
594             }
595             assertEquals(1, counter.get());
596         } finally {
597             s.unsubscribe();
598         }
599     }
600 
601     @Test
602     public void testCache() throws InterruptedException {
603         final AtomicInteger counter = new AtomicInteger();
604         Observable<String> o = Observable.create(new OnSubscribe<String>() {
605 
606             @Override
607             public void call(final Subscriber<? super String> observer) {
608                 new Thread(new Runnable() {
609 
610                     @Override
611                     public void run() {
612                         counter.incrementAndGet();
613                         observer.onNext("one");
614                         observer.onCompleted();
615                     }
616                 }).start();
617             }
618         }).cache();
619 
620         // we then expect the following 2 subscriptions to get that same value
621         final CountDownLatch latch = new CountDownLatch(2);
622 
623         // subscribe once
624         o.subscribe(new Action1<String>() {
625 
626             @Override
627             public void call(String v) {
628                 assertEquals("one", v);
629                 latch.countDown();
630             }
631         });
632 
633         // subscribe again
634         o.subscribe(new Action1<String>() {
635 
636             @Override
637             public void call(String v) {
638                 assertEquals("one", v);
639                 latch.countDown();
640             }
641         });
642 
643         if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
644             fail("subscriptions did not receive values");
645         }
646         assertEquals(1, counter.get());
647     }
648 
649     @Test
650     public void testCacheWithCapacity() throws InterruptedException {
651         final AtomicInteger counter = new AtomicInteger();
652         Observable<String> o = Observable.create(new OnSubscribe<String>() {
653 
654             @Override
655             public void call(final Subscriber<? super String> observer) {
656                 new Thread(new Runnable() {
657 
658                     @Override
659                     public void run() {
660                         counter.incrementAndGet();
661                         observer.onNext("one");
662                         observer.onCompleted();
663                     }
664                 }).start();
665             }
666         }).cache(1);
667 
668         // we then expect the following 2 subscriptions to get that same value
669         final CountDownLatch latch = new CountDownLatch(2);
670 
671         // subscribe once
672         o.subscribe(new Action1<String>() {
673 
674             @Override
675             public void call(String v) {
676                 assertEquals("one", v);
677                 latch.countDown();
678             }
679         });
680 
681         // subscribe again
682         o.subscribe(new Action1<String>() {
683 
684             @Override
685             public void call(String v) {
686                 assertEquals("one", v);
687                 latch.countDown();
688             }
689         });
690 
691         if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
692             fail("subscriptions did not receive values");
693         }
694         assertEquals(1, counter.get());
695     }
696 
697     /**
698      * https://github.com/ReactiveX/RxJava/issues/198
699      * 
700      * Rx Design Guidelines 5.2
701      * 
702      * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
703      * to rethrow the exception on the thread that the message comes out from the Observable.
704      * The OnCompleted behavior in this case is to do nothing."
705      */
706     @Test
707     public void testErrorThrownWithoutErrorHandlerSynchronous() {
708         try {
709             Observable.error(new RuntimeException("failure")).subscribe(new Action1<Object>() {
710 
711                 @Override
712                 public void call(Object t1) {
713                     // won't get anything
714                 }
715 
716             });
717             fail("expected exception");
718         } catch (Throwable e) {
719             assertEquals("failure", e.getMessage());
720         }
721     }
722 
723     /**
724      * https://github.com/ReactiveX/RxJava/issues/198
725      * 
726      * Rx Design Guidelines 5.2
727      * 
728      * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
729      * to rethrow the exception on the thread that the message comes out from the Observable.
730      * The OnCompleted behavior in this case is to do nothing."
731      * 
732      * @throws InterruptedException
733      */
734     @Test
735     public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException {
736         final CountDownLatch latch = new CountDownLatch(1);
737         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
738         Observable.create(new OnSubscribe<String>() {
739 
740             @Override
741             public void call(final Subscriber<? super String> observer) {
742                 new Thread(new Runnable() {
743 
744                     @Override
745                     public void run() {
746                         try {
747                             observer.onError(new Error("failure"));
748                         } catch (Throwable e) {
749                             // without an onError handler it has to just throw on whatever thread invokes it
750                             exception.set(e);
751                         }
752                         latch.countDown();
753                     }
754                 }).start();
755             }
756         }).subscribe(new Action1<String>() {
757 
758             @Override
759             public void call(String t1) {
760 
761             }
762 
763         });
764         // wait for exception
765         latch.await(3000, TimeUnit.MILLISECONDS);
766         assertNotNull(exception.get());
767         assertEquals("failure", exception.get().getMessage());
768     }
769 
770     @Test
771     public void testTakeWithErrorInObserver() {
772         final AtomicInteger count = new AtomicInteger();
773         final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
774         Observable.just("1", "2", "three", "4").take(3).subscribe(new Subscriber<String>() {
775 
776             @Override
777             public void onCompleted() {
778                 System.out.println("completed");
779             }
780 
781             @Override
782             public void onError(Throwable e) {
783                 error.set(e);
784                 System.out.println("error");
785                 e.printStackTrace();
786             }
787 
788             @Override
789             public void onNext(String v) {
790                 int num = Integer.parseInt(v);
791                 System.out.println(num);
792                 // doSomething(num);
793                 count.incrementAndGet();
794             }
795 
796         });
797         assertEquals(2, count.get());
798         assertNotNull(error.get());
799         if (!(error.get() instanceof NumberFormatException)) {
800             fail("It should be a NumberFormatException");
801         }
802     }
803 
804     @Test
805     public void testOfType() {
806         Observable<String> observable = Observable.just(1, "abc", false, 2L).ofType(String.class);
807 
808         @SuppressWarnings("unchecked")
809         Observer<Object> observer = mock(Observer.class);
810         observable.subscribe(observer);
811         verify(observer, never()).onNext(1);
812         verify(observer, times(1)).onNext("abc");
813         verify(observer, never()).onNext(false);
814         verify(observer, never()).onNext(2L);
815         verify(observer, never()).onError(
816                 org.mockito.Matchers.any(Throwable.class));
817         verify(observer, times(1)).onCompleted();
818     }
819 
820     @Test
821     public void testOfTypeWithPolymorphism() {
822         ArrayList<Integer> l1 = new ArrayList<Integer>();
823         l1.add(1);
824         LinkedList<Integer> l2 = new LinkedList<Integer>();
825         l2.add(2);
826 
827         @SuppressWarnings("rawtypes")
828         Observable<List> observable = Observable.<Object> just(l1, l2, "123").ofType(List.class);
829 
830         @SuppressWarnings("unchecked")
831         Observer<Object> observer = mock(Observer.class);
832         observable.subscribe(observer);
833         verify(observer, times(1)).onNext(l1);
834         verify(observer, times(1)).onNext(l2);
835         verify(observer, never()).onNext("123");
836         verify(observer, never()).onError(
837                 org.mockito.Matchers.any(Throwable.class));
838         verify(observer, times(1)).onCompleted();
839     }
840 
841     @Test
842     public void testContains() {
843         Observable<Boolean> observable = Observable.just("a", "b", null).contains("b");
844 
845         @SuppressWarnings("unchecked")
846         Observer<Object> observer = mock(Observer.class);
847         observable.subscribe(observer);
848         verify(observer, times(1)).onNext(true);
849         verify(observer, never()).onNext(false);
850         verify(observer, never()).onError(
851                 org.mockito.Matchers.any(Throwable.class));
852         verify(observer, times(1)).onCompleted();
853     }
854 
855     @Test
856     public void testContainsWithInexistence() {
857         Observable<Boolean> observable = Observable.just("a", "b", null).contains("c");
858 
859         @SuppressWarnings("unchecked")
860         Observer<Object> observer = mock(Observer.class);
861         observable.subscribe(observer);
862         verify(observer, times(1)).onNext(false);
863         verify(observer, never()).onNext(true);
864         verify(observer, never()).onError(
865                 org.mockito.Matchers.any(Throwable.class));
866         verify(observer, times(1)).onCompleted();
867     }
868 
869     @Test
870     public void testContainsWithNull() {
871         Observable<Boolean> observable = Observable.just("a", "b", null).contains(null);
872 
873         @SuppressWarnings("unchecked")
874         Observer<Object> observer = mock(Observer.class);
875         observable.subscribe(observer);
876         verify(observer, times(1)).onNext(true);
877         verify(observer, never()).onNext(false);
878         verify(observer, never()).onError(
879                 org.mockito.Matchers.any(Throwable.class));
880         verify(observer, times(1)).onCompleted();
881     }
882 
883     @Test
884     public void testContainsWithEmptyObservable() {
885         Observable<Boolean> observable = Observable.<String> empty().contains("a");
886 
887         @SuppressWarnings("unchecked")
888         Observer<Object> observer = mock(Observer.class);
889         observable.subscribe(observer);
890         verify(observer, times(1)).onNext(false);
891         verify(observer, never()).onNext(true);
892         verify(observer, never()).onError(
893                 org.mockito.Matchers.any(Throwable.class));
894         verify(observer, times(1)).onCompleted();
895     }
896 
897     @Test
898     public void testIgnoreElements() {
899         Observable<Integer> observable = Observable.just(1, 2, 3).ignoreElements();
900 
901         @SuppressWarnings("unchecked")
902         Observer<Integer> observer = mock(Observer.class);
903         observable.subscribe(observer);
904         verify(observer, never()).onNext(any(Integer.class));
905         verify(observer, never()).onError(any(Throwable.class));
906         verify(observer, times(1)).onCompleted();
907     }
908 
909     @Test
910             public void testJustWithScheduler() {
911                 TestScheduler scheduler = new TestScheduler();
912                 Observable<Integer> observable = Observable.from(Arrays.asList(1, 2)).subscribeOn(scheduler);
913         
914                 @SuppressWarnings("unchecked")
915                 Observer<Integer> observer = mock(Observer.class);
916                 observable.subscribe(observer);
917         
918                 scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
919         
920                 InOrder inOrder = inOrder(observer);
921                 inOrder.verify(observer, times(1)).onNext(1);
922                 inOrder.verify(observer, times(1)).onNext(2);
923                 inOrder.verify(observer, times(1)).onCompleted();
924                 inOrder.verifyNoMoreInteractions();
925             }
926 
927     @Test
928     public void testStartWithWithScheduler() {
929         TestScheduler scheduler = new TestScheduler();
930         Observable<Integer> observable = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);
931 
932         @SuppressWarnings("unchecked")
933         Observer<Integer> observer = mock(Observer.class);
934         observable.subscribe(observer);
935 
936         scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
937 
938         InOrder inOrder = inOrder(observer);
939         inOrder.verify(observer, times(1)).onNext(1);
940         inOrder.verify(observer, times(1)).onNext(2);
941         inOrder.verify(observer, times(1)).onNext(3);
942         inOrder.verify(observer, times(1)).onNext(4);
943         inOrder.verify(observer, times(1)).onCompleted();
944         inOrder.verifyNoMoreInteractions();
945     }
946 
947     @Test
948     public void testRangeWithScheduler() {
949         TestScheduler scheduler = new TestScheduler();
950         Observable<Integer> observable = Observable.range(3, 4, scheduler);
951 
952         @SuppressWarnings("unchecked")
953         Observer<Integer> observer = mock(Observer.class);
954         observable.subscribe(observer);
955 
956         scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
957 
958         InOrder inOrder = inOrder(observer);
959         inOrder.verify(observer, times(1)).onNext(3);
960         inOrder.verify(observer, times(1)).onNext(4);
961         inOrder.verify(observer, times(1)).onNext(5);
962         inOrder.verify(observer, times(1)).onNext(6);
963         inOrder.verify(observer, times(1)).onCompleted();
964         inOrder.verifyNoMoreInteractions();
965     }
966 
967     @Test
968     public void testCollectToList() {
969         Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
970 
971             @Override
972             public List<Integer> call() {
973                 return new ArrayList<Integer>();
974             }
975             
976         }, new Action2<List<Integer>, Integer>() {
977 
978             @Override
979             public void call(List<Integer> list, Integer v) {
980                 list.add(v);
981             }
982         });
983         
984         List<Integer> list =  o.toBlocking().last();
985 
986         assertEquals(3, list.size());
987         assertEquals(1, list.get(0).intValue());
988         assertEquals(2, list.get(1).intValue());
989         assertEquals(3, list.get(2).intValue());
990         
991         // test multiple subscribe
992         List<Integer> list2 =  o.toBlocking().last();
993 
994         assertEquals(3, list2.size());
995         assertEquals(1, list2.get(0).intValue());
996         assertEquals(2, list2.get(1).intValue());
997         assertEquals(3, list2.get(2).intValue());
998     }
999 
1000     @Test
1001     public void testCollectToString() {
1002         String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
1003 
1004             @Override
1005             public StringBuilder call() {
1006                 return new StringBuilder();
1007             }
1008             
1009         }, new Action2<StringBuilder, Integer>() {
1010 
1011             @Override
1012             public void call(StringBuilder sb, Integer v) {
1013                 if (sb.length() > 0) {
1014                     sb.append("-");
1015                 }
1016                 sb.append(v);
1017             }
1018         }).toBlocking().last().toString();
1019 
1020         assertEquals("1-2-3", value);
1021     }
1022     
1023     @Test
1024     public void testMergeWith() {
1025         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1026         Observable.just(1).mergeWith(Observable.just(2)).subscribe(ts);
1027         ts.assertReceivedOnNext(Arrays.asList(1, 2));
1028     }
1029     
1030     @Test
1031     public void testConcatWith() {
1032         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1033         Observable.just(1).concatWith(Observable.just(2)).subscribe(ts);
1034         ts.assertReceivedOnNext(Arrays.asList(1, 2));
1035     }
1036     
1037     @Test
1038     public void testAmbWith() {
1039         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1040         Observable.just(1).ambWith(Observable.just(2)).subscribe(ts);
1041         ts.assertReceivedOnNext(Arrays.asList(1));
1042     }
1043 
1044     @Test(expected = OnErrorNotImplementedException.class)
1045     public void testSubscribeWithoutOnError() {
1046         Observable<String> o = Observable.just("a", "b").flatMap(new Func1<String, Observable<String>>() {
1047             @Override
1048             public Observable<String> call(String s) {
1049                 return Observable.error(new Exception("test"));
1050             }
1051         });
1052         o.subscribe();
1053     }
1054 
1055     @Test
1056     public void testTakeWhileToList() {
1057         final int expectedCount = 3;
1058         final AtomicInteger count = new AtomicInteger();
1059         for (int i = 0;i < expectedCount; i++) {
1060             Observable
1061                     .just(Boolean.TRUE, Boolean.FALSE)
1062                     .takeWhile(new Func1<Boolean, Boolean>() {
1063                         @Override
1064                         public Boolean call(Boolean value) {
1065                             return value;
1066                         }
1067                     })
1068                     .toList()
1069                     .doOnNext(new Action1<List<Boolean>>() {
1070                         @Override
1071                         public void call(List<Boolean> booleans) {
1072                             count.incrementAndGet();
1073                         }
1074                     })
1075                     .subscribe();
1076         }
1077         assertEquals(expectedCount, count.get());
1078     }
1079     
1080     @Test
1081     public void testCompose() {
1082         TestSubscriber<String> ts = new TestSubscriber<String>();
1083         Observable.just(1, 2, 3).compose(new Transformer<Integer, String>() {
1084 
1085             @Override
1086             public Observable<String> call(Observable<Integer> t1) {
1087                 return t1.map(new Func1<Integer, String>() {
1088                     
1089                     @Override
1090                     public String call(Integer t1) {
1091                         return String.valueOf(t1);
1092                     }
1093                     
1094                 });
1095             }
1096             
1097         }).subscribe(ts);
1098         ts.assertTerminalEvent();
1099         ts.assertNoErrors();
1100         ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
1101     }
1102     
1103     @Test
1104     public void testErrorThrownIssue1685() {
1105         Subject<Object, Object> subject = ReplaySubject.create();
1106 
1107         Observable.error(new RuntimeException("oops"))
1108             .materialize()
1109             .delay(1, TimeUnit.SECONDS)
1110             .dematerialize()
1111             .subscribe(subject);
1112 
1113         subject.subscribe();
1114         subject.materialize().toBlocking().first();
1115 
1116         System.out.println("Done");
1117     }
1118 
1119     @Test
1120     public void testEmptyIdentity() {
1121         assertEquals(Observable.empty(), Observable.empty());
1122     }
1123     
1124     @Test
1125     public void testEmptyIsEmpty() {
1126         Observable.<Integer>empty().subscribe(w);
1127         
1128         verify(w).onCompleted();
1129         verify(w, never()).onNext(any(Integer.class));
1130         verify(w, never()).onError(any(Throwable.class));
1131     }
1132     
1133     @Test // cf. https://github.com/ReactiveX/RxJava/issues/2599
1134     public void testSubscribingSubscriberAsObserverMaintainsSubscriptionChain() {
1135         TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
1136         Subscription subscription = Observable.just("event").subscribe((Observer<Object>) subscriber);
1137         subscription.unsubscribe();
1138 
1139         subscriber.assertUnsubscribed();
1140     }
1141 }